package cn.edu.zstu.qingzhu.qingzhushop.config;



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.SimpleTimeZone;
import java.util.logging.SimpleFormatter;

/**
 * FileName: RabbitmqConfig
 *
 * @Author: LvYibin
 * @Date: 2020/7/18 10:32
 * @Description:
 */
@Configuration
public class CustomRabbitMQDirectConfig {
    //    private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
//    //自动装配RabbitMQ链接工厂实例
//    @Autowired
//    private CachingConnectionFactory connectionFactory;
//    //自动装配消息监听器所在的容器工厂的配置类实例
//    @Autowired
//    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
//    /**
//     * 下面为单一消费者实例配置
//     * @return
//     * */
//    @Bean(name = "singleListenerContainer")
//    public SimpleRabbitListenerContainerFactory listenerContainer(){
//        //定义监听器所在的容器工厂
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        //设置容器工厂所用的实例
//        factory.setConnectionFactory(connectionFactory);
//        //设置消息在传输中的格式，这里用json
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        //设置并发消费者实例的初始数量。这里为1个
//        factory.setConcurrentConsumers(1);
//        //设置并发消费者实例的最大数量。这里为1个
//        factory.setMaxConcurrentConsumers(1);
//        //设置并发消费者实例中每个实例拉取的消息数量-这里为1个
//        factory.setPrefetchCount(1);
//        return factory;
//    }
//
//    /**
//     * 下面为多个消费者实例配置,主要针对高并发的业务
//     * @return
//     * */
//    @Bean(name = "multiListenerContainer")
//    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
//        //定义监听器所在的容器工厂
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        //设置容器工厂所用的实例
//        factory.setConnectionFactory(connectionFactory);
//        //设置消息在传输中的格式，这里用json
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        //设置并发消费者实例的初始数量。这里为10个
//        factory.setConcurrentConsumers(10);
//        //设置并发消费者实例的最大数量。这里为15个
//        factory.setMaxConcurrentConsumers(15);
//        //设置并发消费者实例中每个实例拉取的消息数量-这里为10个
//        factory.setPrefetchCount(10);
//        return factory;
//    }
//
//    //自定义配置Rabbitmq发送消息的操作组件 RabbitTemplate
//    @Bean
//    public RabbitTemplate rabbitTemplate(){
//        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//        connectionFactory.setPublisherReturns(true);
//
//        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//        rabbitTemplate.setMandatory(true);
//        //发送消息后，如果发送成功，则输出“信息发送成功”的反馈信息
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                log.info("消息发送成功：correlationData({}), ack({}),cause({})",correlationData,ack,cause);
//            }
//        });
//
//        //发送消息后，如果发送失败，则输出“信息发送失败-消息丢失”的反馈信息
//        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//            @Override
//            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//                log.info("消息丢失：exchange({}),route({}),replyCode({}),replyText({}),message:{}",
//                        exchange,routingKey,replyCode,replyText,message);
//            }
//        });
//        return rabbitTemplate;
//    }
//
//    //创建队列，交换机和路由并绑定
//    @Autowired
//    private Environment env;
//    //创建队列
//    @Bean(name = "basicQueue")
//    public Queue basicQueue(){
//        return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
//    }
//    //创建交换机
//    @Bean
//    public DirectExchange basicExchange(){
//        return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
//    }
//    //创建绑定
//    @Bean
//    public Binding basicBinding(){
//        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
//    }
    private static final Logger log= LoggerFactory.getLogger(CustomRabbitMQDirectConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        return factory;
    }

    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(15);
        factory.setPrefetchCount(10);
        return factory;
    }
    /**
     * RabbitMQ发送消息的操作组件实例
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }

    //定义读取配置文件的环境变量实例
    @Autowired
    private Environment env;

    /**创建简单消息模型：队列、交换机和路由 **/

    //创建队列
    @Bean
    public Queue basicQueue(){
        return new Queue("queue1",true);
    }

    //创建交换机：在这里以DirectExchange为例，在后面章节中我们将继续详细介绍这种消息模型
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange("exchange1",true,false);
    }

    //创建绑定
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with("routing:key:1");
    }

    //创建死信队列
    @Bean
    public Queue basicDeadQueue(){
        Map<String,Object> args = new HashMap();
        //创建死信交换机
        args.put("x-dead-letter-exchange","dead.exchange");
        args.put("x-dead-letter-routing-key","dead.routing.key");
        args.put("x-message-ttl",100000);
        return new Queue("dead.queue",true,false,false,args);
    }

    @Bean
    public TopicExchange basicExchange1(){
        return new TopicExchange("topic:exchange1",true,false);
    }

    @Bean
    public Binding basicBinding1(){
        return BindingBuilder.bind(basicDeadQueue()).to(basicExchange1()).with("routing:key:2");
    }

    @Bean
    public Queue basicQueue1(){
        return new Queue("queue2",true);
    }

    //创建交换机：在这里以DirectExchange为例，在后面章节中我们将继续详细介绍这种消息模型
    @Bean
    public TopicExchange basicExchange2(){
        return new TopicExchange("dead.exchange",true,false);
    }

    //创建绑定
    @Bean
    public Binding deadbasicBinding(){
        return BindingBuilder.bind(basicQueue1()).to(basicExchange2()).with("dead.routing.key");
    }

    //创建秒杀死信队列
    @Bean
    public Queue basicSecKillDeadQueue(){
        Map<String,Object> args = new HashMap();
        //创建死信交换机
        args.put("x-dead-letter-exchange","seckill.dead.exchange");
        args.put("x-dead-letter-routing-key","seckill.dead.routing.key");
        args.put("x-message-ttl",86400000);
        return new Queue("seckill.dead.queue",true,false,false,args);
    }

    @Bean
    public TopicExchange secKillBasicExchange1(){
        return new TopicExchange("topic:seckill:exchange1",true,false);
    }

    @Bean
    public Binding secKillBasicBinding1(){
        return BindingBuilder.bind(basicSecKillDeadQueue()).to(secKillBasicExchange1()).with("routing:key:seckill");
    }

    @Bean
    public Queue basicSecKillQueue1(){
        return new Queue("queue.seckill",true);
    }

    //创建交换机：在这里以DirectExchange为例，在后面章节中我们将继续详细介绍这种消息模型
    @Bean
    public TopicExchange basicSecKillExchange2(){
        return new TopicExchange("seckill.dead.exchange",true,false);
    }

    //创建绑定
    @Bean
    public Binding deadbasicSecKillBinding(){
        return BindingBuilder.bind(basicSecKillQueue1()).to(basicSecKillExchange2()).with("seckill.dead.routing.key");
    }

}
